Exchange Rates Ingestion Pipeline

Jan 2024 ~ Flo Energy

Length:   1mo (at 0.5 FTE)

Programming language:   Python (os, datetime, pytz, snowflake-connector-python, Pandas, urllib3, Dagster)

Data:   Daily SGD exchange rates provided by the Monetary Authority of Singapore (MAS)

Problem description:
Orchestrate a pipeline that retrieves the latest exchange rates through an API, loads them first into an Amazon S3 bucket and then into a Snowflake table

Approach & Results:
In the image below, one can observe the architecture of the ingestion pipeline. First, a Python connector queries the raw Snowflake table for the last date with an exchange rate. Then, this date is used together with the code execution date to create the date range for which the latest SGD exchange rates will be retrieved using the MAS API. After the rates are fetched in a Pandas dataframe, they are exported as a .csv file to the Amazon S3 bucket that serves as a data lake and later copied into the raw Snowflake table.

Taking into account a potential erroneous API response, a retry policy was implemented to repeat a failing op up to three times, with a delay of ten minutes between the runs. Finally, each pipeline step was programmed into a Dagster Software-defined asset and linked in a Dagster job. This job was scheduled to run every working day shortly after the exchange rates are published by MAS. Therefore, the latest exchange rates are automatically retrieved and ingested into the data warehouse with minimum human supervision.

FX Rates Ingestion Diagram

  • Address

    Amsterdam, the Netherlands